package com.ivyft.hive.hadoop;

import com.ivyft.hive.serde2.protobuf.IntLengthHeaderFile;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
 * <pre>
 *
 * Created by IntelliJ IDEA.
 * User: zhenqin
 * Date: 15/4/8
 * Time: 14:44
 * To change this template use File | Settings | File Templates.
 *
 * </pre>
 *
 * @author zhenqin
 */
public class IntLengthHeaderMr1RecordReader implements RecordReader<LongWritable, BytesWritable> {
    protected IntLengthHeaderFile.Reader in;

    protected final AtomicLong counter = new AtomicLong(0);


    protected final int total;


    protected static Log LOG = LogFactory.getLog(IntLengthHeaderMr1RecordReader.class);



    public IntLengthHeaderMr1RecordReader(FileSplit split, JobConf job) throws IOException {
        Path path = split.getPath();
        FileSystem fs = path.getFileSystem(job);

        this.in = new IntLengthHeaderFile.Reader(job, fs, path);
        total = in.available();
    }




    public IntLengthHeaderMr1RecordReader(Path path, Configuration conf) throws IOException {
        FileSystem fs = path.getFileSystem(conf);

        this.in = new IntLengthHeaderFile.Reader(conf, fs, path);
        total = in.available();
    }



    /**
     * Return the progress within the input split
     * @return 0.0 to 1.0 of the input byte range
     */
    @Override
    public float getProgress() throws IOException {
        if(total == 0) {
            return 0.0F;
        }
        return total - in.available() / total;
    }


    @Override
    public boolean next(LongWritable key, BytesWritable value) throws IOException {
        this.counter.addAndGet(1);

        BytesWritable content = null;
        if(in.hasNext()) {
            content = in.next();
        }

        if(content != null) {
            key.set(this.counter.get());
            value.set(content);

            value.setCapacity(content.getLength());
        }


        if(counter.get() % 1000 == 0) {
            LOG.info("process counter: " + counter.get());
        }

        return content != null;
    }

    @Override
    public LongWritable createKey() {
        LongWritable key = new LongWritable();
        return key;
    }

    @Override
    public BytesWritable createValue() {
        BytesWritable v = new BytesWritable();
        return v;
    }

    @Override
    public long getPos() throws IOException {
        return total - in.available();
    }

    @Override
    public synchronized void close() throws IOException {
        in.close();
    }

}
